// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "exec/data-sink.h"

#include "runtime/descriptors.h"

namespace impala {

class TupleRow;

class TableSinkBaseConfig : public DataSinkConfig {
 public:
  void Close() override;

  /// Expressions for computing the target partitions to which a row is written.
  std::vector<ScalarExpr*> partition_key_exprs_;

  ~TableSinkBaseConfig() override {}
};

class TableSinkBase : public DataSink {
public:
  TableSinkBase(TDataSinkId sink_id, const TableSinkBaseConfig& sink_config,
      const std::string& name, RuntimeState* state) :
      DataSink(sink_id, sink_config, name, state),
      table_id_(sink_config.tsink_->table_sink.target_table_id),
      partition_key_exprs_(sink_config.partition_key_exprs_) {}

  virtual bool is_overwrite() const { return false; }
  virtual bool is_result_sink() const { return false; }
  virtual int64_t write_id() const { return -1; }
  virtual std::string staging_dir() const { return ""; }
  virtual int skip_header_line_count() const { return 0; }
  virtual TSortingOrder::type sorting_order() const = 0;
  virtual const vector<int32_t>& sort_columns() const {
    static vector<int32_t> dummy;
    return dummy;
  }
  virtual const std::map<string, int64_t>& GetParquetBloomFilterColumns() const {
    static std::map<string, int64_t> dummy;
    return dummy;
  }

  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
  Status Open(RuntimeState* state) override;
  void Close(RuntimeState* state) override;

  RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
  RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; }
  RuntimeProfile::Counter* encode_timer() { return encode_timer_; }
  RuntimeProfile::Counter* hdfs_write_timer() { return hdfs_write_timer_; }
  RuntimeProfile::Counter* compress_timer() { return compress_timer_; }

  virtual std::string DebugString() const = 0;
protected:
  /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by
  /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of the rows in
  /// the current batch to insert into the partition. The PartitionPair owns the
  /// OutputPartition via a unique_ptr so that the memory is freed as soon as the
  /// PartitionPair is removed from the map. This is important, because the
  /// PartitionPairs can have different lifetimes. For example, a clustered insert into a
  /// partitioned table iterates over the partitions, so only one PartitionPairs is
  /// in the map at any given time.
  typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> PartitionPair;
  typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;

  /// Returns TRUE if the target table is transactional.
  bool IsTransactional() const { return IsHiveAcid() || IsIceberg(); }

  virtual bool IsHiveAcid() const { return false; }

  /// Initialises the filenames of a given output partition, and opens the temporary file.
  /// The caller of this function must already have filled partition-related information
  /// in 'output_partition', such as 'iceberg_spec_id', 'partition_name',
  /// 'raw_partition_names', 'external_partition_name' for table types where these fields
  /// are applicable.
  /// If the partition will not have any rows added to it, empty_partition must be true.
  Status InitOutputPartition(RuntimeState* state,
      const HdfsPartitionDescriptor& partition_descriptor,
      OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT;

  /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
  /// The Hdfs directory is created from the target table's base Hdfs dir,
  /// the partition_key_names_ and the evaluated partition_key_exprs_.
  /// The Hdfs file name is the unique_id_str_.
  void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
      OutputPartition* output);

  /// Returns the ith partition name of the table.
  std::string GetPartitionName(int i);

  // Directory names containing partition-key values need to be UrlEncoded, in
  // particular to avoid problems when '/' is part of the key value (which might
  // occur, for example, with date strings). Hive will URL decode the value
  // transparently when Impala's frontend asks the metastore for partition key values,
  // which makes it particularly important that we use the same encoding as Hive. It's
  // also not necessary to encode the values when writing partition metadata. You can
  // check this with 'show partitions <tbl>' in Hive, followed by a select from a
  // decoded partition key value.
  std::string UrlEncodePartitionValue(const std::string& raw_str);

  /// Add a temporary file to an output partition.  Files are created in a
  /// temporary directory and then moved to the real partition directory by the
  /// coordinator in a finalization step. The temporary file's current location
  /// and final destination are recorded in the state parameter.
  /// If this function fails, the tmp file is cleaned up.
  Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition)
      WARN_UNUSED_RESULT;

  /// Updates runtime stats of HDFS with rows written, then closes the file associated
  /// with the partition by calling ClosePartitionFile()
  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
      WARN_UNUSED_RESULT;

  /// Same as above, but for delete files, as such table sinks have their own
  /// DmlExecState.
  Status FinalizeDeletePartitionFile(RuntimeState* state, OutputPartition* partition,
      DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;

  /// Writes all rows in 'batch' referenced by the row index vector in 'indices' to the
  /// partition's writer. If 'indices' is empty, then it writes all rows in 'batch'.
  Status WriteRowsToPartition(
      RuntimeState* state, RowBatch* batch, OutputPartition* partition,
      const std::vector<int32_t>& indices = {})
      WARN_UNUSED_RESULT;

  /// Writes all rows to the partition's writer. It is only used for writing delete files
  /// as such table sinks have their own DmlExecState.
  Status WriteDeleteRowsToPartition(
      RuntimeState* state, RowBatch* batch, OutputPartition* partition,
      DmlExecState* dml_exec_state)
      WARN_UNUSED_RESULT;

  /// Closes the hdfs file for this partition as well as the writer.
  Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
      WARN_UNUSED_RESULT;

  // Returns TRUE if the staging step should be skipped for this partition. This allows
  // for faster INSERT query completion time for the S3A filesystem as the coordinator
  // does not have to copy the file(s) from the staging locaiton to the final location. We
  // do not skip for INSERT OVERWRITEs because the coordinator will delete all files in
  // the final location before moving the staged files there, so we cannot write directly
  // to the final location and need to write to the temporary staging location.
  bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition);

  /// Returns TRUE for Iceberg tables.
  bool IsIceberg() const { return table_desc_->IsIcebergTable(); }

  /// Returns TRUE if an external output directory was provided.
  bool HasExternalOutputDir() { return !external_output_dir_.empty(); }

  /// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs,
  /// evaluated against 'row'. The generated string is much shorter than the full Hdfs
  /// file name.
  void GetHashTblKey(const TupleRow* row,
      const std::vector<ScalarExprEvaluator*>& evals, std::string* key);

  /// Table id resolved in Prepare() to set tuple_desc_;
  TableId table_id_;

  /// string representation of the unique fragment instance id. Used for per-partition
  /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
  std::string unique_id_str_;

  /// Descriptor of target table. Set in Prepare().
  const HdfsTableDescriptor* table_desc_ = nullptr;

  /// The partition descriptor used when creating new partitions from this sink.
  /// Currently we don't support multi-format sinks.
  const HdfsPartitionDescriptor* prototype_partition_;

  /// Expressions for computing the target partitions to which a row is written.
  const std::vector<ScalarExpr*>& partition_key_exprs_;
  std::vector<ScalarExprEvaluator*> partition_key_expr_evals_;

  /// Subset of partition_key_expr_evals_ which are not constant. Set in Prepare().
  /// Used for generating the string key of hash_tbl_.
  std::vector<ScalarExprEvaluator*> dynamic_partition_key_expr_evals_;

  /// Stores the current partition during clustered inserts across subsequent row batches.
  /// Only set if 'input_is_clustered_' is true.
  PartitionPair* current_clustered_partition_ = nullptr;

  /// Stores the current partition key during clustered inserts across subsequent row
  /// batches. Only set if 'input_is_clustered_' is true.
  std::string current_clustered_partition_key_;

  /// The directory in which an external FE expects results to be written to.
  std::string external_output_dir_;

  RuntimeProfile::Counter* partitions_created_counter_;
  RuntimeProfile::Counter* files_created_counter_;
  RuntimeProfile::Counter* rows_inserted_counter_;
  RuntimeProfile::Counter* bytes_written_counter_;

  /// Time spent converting tuple to on disk format.
  RuntimeProfile::Counter* encode_timer_;
  /// Time spent writing to hdfs
  RuntimeProfile::Counter* hdfs_write_timer_;
  /// Time spent compressing data
  RuntimeProfile::Counter* compress_timer_;

private:
  /// Writes rows to the partition's writer. Sets 'new_file' to true when it cannot write
  /// all rows to the current output file.
  Status WriteRowsToFile(
      RuntimeState* state, RowBatch* batch, OutputPartition* partition,
      const std::vector<int32_t>& indices, bool *new_file) WARN_UNUSED_RESULT;

  Status FinalizePartitionFileImpl(RuntimeState* state, OutputPartition* partition,
      bool is_delete, DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
};

}
